Skip to content

[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses#56042

Closed
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56956-introduce-flow-data-classes
Closed

[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses#56042
AnishMahto wants to merge 11 commits into
apache:masterfrom
AnishMahto:SPARK-56956-introduce-flow-data-classes

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto commented May 21, 2026

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


What changes were proposed in this pull request?

Introduce dataclass for unresolved AutoCDC flow (AutoCdcFlow) and resolved AutoCDC flow (AutoCdcMergeFlow). Add wiring to analyze an AutoCdcFlow to an AutoCdcMergeFlow.

A small refactor was additionally made on the UnresolvedFlow and ResolvedFlow class hierarchy.

Why are the changes needed?

Support AutoCDC flow registration and analysis. AutoCDC flow execution will be supported in a future PR. Previously, an UnresolvedFlow additionally always represented an untyped-flow; a flow where do not yet know its execution-type, i.e streaming, append-once, etc.

AutoCdcFlow is a specialized flow with support for only streaming flows, hence it represents a flow whose execution-type we know at construction. It is still unresolved at registration time, and needs to go through resolution to determine its position in the DAG and its input/output schemas.

Hence we introduce the intermediary child UntypedFlow for UnresolvedFlow, which all previous flows are classified as during registration. An AutoCdcFlow directly implements UnresolvedFlow (skipping `UntypedFlow in its inheritance chain) because it is not untyped.

Does this PR introduce any user-facing change?

No, the AutoCDC feature is not released anywhere yet.

How was this patch tested?

ConnectValidPipelineSuite and AutoCdcFlowSuite

Was this patch authored or co-authored using generative AI tooling?

Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

@AnishMahto
Copy link
Copy Markdown
Contributor Author

AnishMahto commented May 22, 2026

@szehon-ho

This is actually a fairly small change btw, 600 LOC is just tests. The only real logic added here is some validation on construction of an AutoCdcMergeFlow.

Comment on lines -116 to -134

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver

microbatch.schema.fieldNames
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
.foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
"columnName" -> conflictingColumnName,
"schemaName" -> "microbatch",
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
)
)
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic and corresponding test were removed because by construction its never possible now, given how AutoCdcMergeFlow validates requireReservedPrefixAbsentInSourceColumns.

@AnishMahto AnishMahto force-pushed the SPARK-56956-introduce-flow-data-classes branch from 5d4b9f9 to bc0c1d8 Compare May 22, 2026 20:52
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline comments below (streaming enforcement, SCD2 error type, typo, duplicate test, isAutoCdcFlow clarity). None blocking if streaming validation is intentionally deferred.

flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.

That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.

Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:

case acf: AutoCdcFlow =>
  if (!funcResult.dataFrame.get.isStreaming) {
    throw new AnalysisException(
      errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
      messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
    )
  }
  new AutoCdcMergeFlow(acf, funcResult)

Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a good question, but we don't actually need to do anything here because:

  1. It's not possible to create an MV with an AutoCDC flow input because (a) MV's must be defined with an inline flow function, (b) AutoCDC flows must be defined as standalone flows with a target, and (c) MVs are not allowed to have multiple input flows. That means if an AutoCDC flow targets an MV, the MV necessarily has at least 2 flows, and would be invalidated
  2. AutoCDC flows are unique in that users don't actually get to define their flow functions. SDP will define the AutoCDC flow function at flow registration time, and we will define it in such a way that forces it to be streaming by construction (i.e spark.read_stream(source))

Eventually we will support AutoCDC once flows which would indeed be batch flows, but that's not supported today - once = false always by construction.

As a middle ground though I'll also introduce a test demonstrating that AutoCDC flows cannot write to MVs in either the flow execution or flow registration PR.

Comment thread sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala Outdated
Comment thread sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala Outdated
@AnishMahto AnishMahto requested a review from szehon-ho May 22, 2026 22:44
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice foundation PR — the UnresolvedFlow / UntypedFlow split and moving reserved-prefix validation to AutoCdcMergeFlow construction look solid. A few inline nits below; all are non-blocking and can be tracked in the execution/registration follow-ups.

* columns that the AutoCDC MERGE engine projects onto the target table. Downstream
* dependencies in the pipeline see this augmented schema.
*/
override val schema: StructType = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AutoCdcMergeFlow overrides schema to the augmented target-facing schema (column selection + CDC metadata), but still inherits ResolvedFlow.load() which returns the raw CDF df. If any code path reads this flow as an Input and assumes load() matches schema, that could be surprising.

Probably fine if AutoCDC outputs are always materialized to tables first, but worth confirming when the execution PR wires this up — either override load() or document that the augmented schema is for inference/planning only. Non-blocking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great callout.

Since AutoCDC flows must write to a streaming table (MV, temp view, persisted view are all invalid targets), AutoCdcMergeFlow.load isn't actually ever called.

But the comment is right that without an override, the inherited AutoCdcMergeFlow.load implementation is incorrect. Regardless of whether its called at runtime or not today, we should make sure the contract is correct.

Added a meaningful override and also left a comment explaining this. I also added tests to demonstrate that AutoCDC flows cannot write to MV/persisted view/temp view.

As a side note, I think this is a great example of why good inheritance is difficult to get right and more often than not adds unnecessary coupling 😛.

@AnishMahto AnishMahto force-pushed the SPARK-56956-introduce-flow-data-classes branch from d8410ec to ccd031f Compare May 23, 2026 22:37
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — solid foundation PR for AutoCDC flow registration and analysis. The UnresolvedFlow / UntypedFlow split, validation at AutoCdcMergeFlow construction, and graph-level guardrails look good. Execution and registration wiring can follow in later PRs.

@AnishMahto AnishMahto force-pushed the SPARK-56956-introduce-flow-data-classes branch from ccd031f to 9e34887 Compare May 25, 2026 05:17
@AnishMahto
Copy link
Copy Markdown
Contributor Author

OOM for org.apache.spark.util.collection.SorterSuite is unrelated to these changes, and likely transient.

@cloud-fan
Copy link
Copy Markdown
Contributor

yea the test failure is unrelated, thanks, merging to master/4.x/4.2 (most CDC work is already in 4.2)

@cloud-fan cloud-fan closed this in 49aaed2 May 26, 2026
cloud-fan pushed a commit that referenced this pull request May 26, 2026
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7

--------

### What changes were proposed in this pull request?
Introduce dataclass for unresolved AutoCDC flow (`AutoCdcFlow`) and resolved AutoCDC flow (`AutoCdcMergeFlow`). Add wiring to analyze an `AutoCdcFlow` to an `AutoCdcMergeFlow`.

A small refactor was additionally made on the `UnresolvedFlow` and `ResolvedFlow` class hierarchy.

### Why are the changes needed?
Support AutoCDC flow registration and analysis. AutoCDC flow execution will be supported in a future PR. Previously, an `UnresolvedFlow` additionally always represented an untyped-flow; a flow where do not yet know its execution-type, i.e streaming, append-once, etc.

`AutoCdcFlow` is a specialized flow with support for only streaming flows, hence it represents a flow whose execution-type we know at construction. It is still unresolved at registration time, and needs to go through resolution to determine its position in the DAG and its input/output schemas.

Hence we introduce the intermediary child `UntypedFlow` for `UnresolvedFlow`, which all previous flows are classified as during registration. An `AutoCdcFlow` directly implements `UnresolvedFlow` (skipping `UntypedFlow in its inheritance chain) because it is not untyped.

### Does this PR introduce _any_ user-facing change?
No, the AutoCDC feature is not released anywhere yet.

### How was this patch tested?
`ConnectValidPipelineSuite` and `AutoCdcFlowSuite`

### Was this patch authored or co-authored using generative AI tooling?
Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Closes #56042 from AnishMahto/SPARK-56956-introduce-flow-data-classes.

Authored-by: AnishMahto <anish.mahto99@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 49aaed2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 26, 2026
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7

--------

### What changes were proposed in this pull request?
Introduce dataclass for unresolved AutoCDC flow (`AutoCdcFlow`) and resolved AutoCDC flow (`AutoCdcMergeFlow`). Add wiring to analyze an `AutoCdcFlow` to an `AutoCdcMergeFlow`.

A small refactor was additionally made on the `UnresolvedFlow` and `ResolvedFlow` class hierarchy.

### Why are the changes needed?
Support AutoCDC flow registration and analysis. AutoCDC flow execution will be supported in a future PR. Previously, an `UnresolvedFlow` additionally always represented an untyped-flow; a flow where do not yet know its execution-type, i.e streaming, append-once, etc.

`AutoCdcFlow` is a specialized flow with support for only streaming flows, hence it represents a flow whose execution-type we know at construction. It is still unresolved at registration time, and needs to go through resolution to determine its position in the DAG and its input/output schemas.

Hence we introduce the intermediary child `UntypedFlow` for `UnresolvedFlow`, which all previous flows are classified as during registration. An `AutoCdcFlow` directly implements `UnresolvedFlow` (skipping `UntypedFlow in its inheritance chain) because it is not untyped.

### Does this PR introduce _any_ user-facing change?
No, the AutoCDC feature is not released anywhere yet.

### How was this patch tested?
`ConnectValidPipelineSuite` and `AutoCdcFlowSuite`

### Was this patch authored or co-authored using generative AI tooling?
Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Closes #56042 from AnishMahto/SPARK-56956-introduce-flow-data-classes.

Authored-by: AnishMahto <anish.mahto99@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 49aaed2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request May 26, 2026
… dataclasses

### What changes were proposed in this pull request?

Follow-up cleanup of review feedback on apache#56042:

- Remove the now-dead `Scd1BatchProcessor.validateCdcMetadataColumnNotPresent`
  validator and its call site. It referenced the error class
  `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` which the parent PR removed from
  `error-conditions.json`; the new construction-time check in
  `AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the
  authoritative validator and supersedes it.
- Lift `comment: Option[String]` to the `UnresolvedFlow` trait so both
  `UntypedFlow` and `AutoCdcFlow` carry it symmetrically; previously only
  `AutoCdcFlow` had it.
- Reorder `AutoCdcFlow`'s constructor so defaulted params (`sqlConf`,
  `comment`) trail the non-defaulted ones (`origin`, `changeArgs`), allowing
  positional construction.
- Fix Scaladoc/comment text: factual wording for the keys-presence check,
  the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference
  (was `Scd1ForeachBatchExec`, which does not exist), and several minor
  grammar/typography nits.

### Why are the changes needed?

Cleanup of follow-up items identified during review of the parent PR. The
dead validator is the most material: if its code path were reached, it
would throw an internal `SparkException("Cannot find main error class ...")`
instead of a user-facing `AnalysisException`. The path is unreachable
through current call patterns (the construction-time check rejects bad
schemas before this code runs), but the references should not be left
behind.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`,
`ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to
cover the affected paths. No new tests added; the change is structural
cleanup and doc edits.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by Claude.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants